1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.http.server.httpserver;
12 import kiss.logger;
13 import collie.codec.http.session.httpsession;
14 import collie.codec.http.httptansaction;
15 import collie.codec.http.server.httpserveroptions;
16 import collie.codec.http.httpmessage;
17 import collie.codec.http.server.requesthandler;
18 import collie.codec.http.codec.httpcodec;
19 import collie.codec.http.httptansaction;
20 import collie.bootstrap.server;
21 import kiss.container.Vector;
22 import collie.channel;
23 import kiss.net.TcpStream;
24 import kiss.event;
25 import kiss.net.TcpListener;
26 import kiss.event.socket;
27 
28 import collie.net.server.tcpserver;
29 import collie.net.server.connection;
30 import collie.bootstrap.exception;
31 import collie.bootstrap.exception;
32 import collie.bootstrap.serversslconfig;
33 
34 import std.socket;
35 import std.parallelism;
36 
37 alias HTTPPipeline = Pipeline!(const(ubyte[]), StreamWriteBuffer);
38 alias HTTPServer = HTTPServerImpl!true;
39 alias HttpServer = HTTPServerImpl!false;
40 
41 final class HTTPServerImpl(bool UsePipeline) : HTTPSessionController
42 {
43 	static if(UsePipeline){
44 		alias Server = ServerBootstrap!HTTPPipeline;
45 	} else {
46 		alias Server = TCPServer;
47 	}
48 	alias SVector = Vector!(Server);
49 	alias IPVector = Vector!(HTTPServerOptions.IPConfig);
50 
51 	this(HTTPServerOptions options)
52 	{
53 		version(USE_SSL){
54 			if(options.ssLConfig){
55 				_ssl_Ctx = options.ssLConfig.generateSSLCtx();
56 				if(_ssl_Ctx is null)
57 					throw new SSLException("can not generate SSL_Ctx!");
58 			}
59 		}
60 		_options = options;
61 		_mainLoop = new EventLoop();
62 		size_t thread = _options.threads - 1;
63 		if(thread > 0) {
64 			if(thread>totalCPUs) thread = totalCPUs-1;
65 			_group = new EventLoopGroup(cast(uint)thread);
66 		}
67 
68 	}
69 
70 	void bind(ref IPVector addrs)
71 	{
72 		if(_isStart) return;
73 		_ipconfigs = addrs;
74 		for(size_t i = 0; i < _servers.length; ++i)
75 		{
76 			logDebug("start listen!!!");
77 			static if(UsePipeline)
78 				_servers[i].stopListening();
79 			else
80 				_servers[i].close();
81 		}
82 		_servers.clear();
83 		for(size_t i = 0; i < _ipconfigs.length; ++i)
84 		{
85 			newServer(_ipconfigs[i]);
86 		}
87 	}
88 
89 	void addBind(ref HTTPServerOptions.IPConfig addr)
90 	{
91 		// logDebug("",_isStart);
92 		if(_isStart) return;
93 		newServer(addr);
94 		_ipconfigs.insertBack(addr);
95 	}
96 
97 	void start()
98 	{
99 		// logDebug("start ",_isStart);
100 		if(_isStart) return;
101 		_isStart = true;
102 		for(size_t i = 0; i < _servers.length; ++i)
103 		{
104 			static if(UsePipeline)
105 				_servers[i].startListening();
106 			else {
107 				Server ser = _servers[i];
108 				ser.startTimeout(cast(uint)_options.timeOut);
109 				ser.listen(1024);
110 			}
111 		}
112 		if(_group)
113 			_group.start();
114 		_mainLoop.run();
115 	}
116 
117 	void stop()
118 	{
119 		if(!_isStart) return;
120 		if(_group)
121 			_group.stop();
122 		_mainLoop.stop();
123 	}
124 
125 	ref const(IPVector) addresses() const{ return _ipconfigs;}
126 	EventLoop eventLoop(){return _mainLoop;}
127 	EventLoopGroup group(){return _group;}
128 	ref const(SVector) servers(){return _servers;}
129 protected:
130 	override HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg)
131 	{/*  will run  in Multi-thread */
132 		RequestHandler req = null;
133 		for(size_t i = 0; i < _options.handlerFactories.length; ++i)
134 		{
135 			req = _options.handlerFactories[i](req,msg);
136 		}
137 		if(req is null)
138 			return null;
139 		RequestHandlerAdaptor ada = new RequestHandlerAdaptor(req);
140 		ada.setTransaction(txn);
141 		return ada;
142 	}
143 
144 	override void attachSession(HTTPSession session){/*  will run  in Multi-thread */}
145 
146 	override void detachSession(HTTPSession session){/*  will run  in Multi-thread */}
147 
148 	override void onSessionCodecChange(HTTPSession session){/*  will run  in Multi-thread */}
149 
150 	uint maxHeaderSize() const shared {return cast(uint)_options.maxHeaderSize;}
151 
152 	static if(UsePipeline){
153 		static void setAcceptorConfig(ref shared(HTTPServerOptions.IPConfig) config,TcpListener acceptor)
154 		{
155 			version(linux) {
156 				if(config.enableTCPFastOpen){
157 					acceptor.setOption(SocketOptionLevel.TCP,cast(SocketOption)23,config.fastOpenQueueSize);
158 				}
159 			}
160 		}
161 	}
162 
163 	void newServer(HTTPServerOptions.IPConfig ipconfig )
164 	{
165 		static if(UsePipeline){
166 			Server ser = new Server(_mainLoop);
167 			if(_group)
168 				ser.setReusePort(true);
169 			ser.group(_group).childPipeline(new shared ServerHandlerFactory(this));
170 			version(USE_SSL){
171 				if(_options.ssLConfig)
172 					ser.setSSLConfig(_options.ssLConfig);
173 			}
174 			ser.pipeline(new shared ServerAccpeTFactory(ipconfig));
175 			ser.heartbeatTimeOut(cast(uint)_options.timeOut);
176 			ser.bind(ipconfig.address);
177 			logDebug("binding on: ", ipconfig.address.toString());
178 			_servers.insertBack(ser);
179 		} else {
180 			bool ruseport = _group !is null;
181 			_servers.insertBack(newTCPServer(_mainLoop,ipconfig.address,ruseport,ipconfig.enableTCPFastOpen,ipconfig.fastOpenQueueSize));
182 			if(ruseport){
183 				foreach(EventLoop loop; _group){
184 					_servers.insertBack(newTCPServer(loop,ipconfig.address,ruseport,ipconfig.enableTCPFastOpen,ipconfig.fastOpenQueueSize));
185 				}
186 			}
187 
188 		}
189 	}
190 	static if(!UsePipeline){
191 		Server newTCPServer(EventLoop loop,Address address,bool ruseport, bool enableTCPFastOpen, uint fastOpenQueueSize )
192 		{
193 			Server ser = new Server(loop);
194 			ser.setNewConntionCallBack(&newConnect);
195 			logDebug("binding on: ", address.toString());
196 
197 			ser.bind(address,(TcpListener accpet) @trusted {
198 					if(ruseport)
199 						accpet.reusePort(true);
200 					else {
201 						version(windows){
202 							import core.sys.windows.winsock2;
203 							accpet.setOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_EXCLUSIVEADDRUSE,true);
204 						} else {
205 							accpet.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
206 						}
207 					}
208 					version(linux) {
209 						if(enableTCPFastOpen){
210 							accpet.setOption(SocketOptionLevel.TCP,cast(SocketOption)23,fastOpenQueueSize);
211 						}
212 					}
213 				});
214 			return ser;
215 		}
216 	}
217 
218 
219 	ServerConnection newConnect(TcpListener sender, TcpStream stream) 
220 	{
221 		return new HttpHandlerConnection(stream,this,
222 			new HTTP1XCodec(TransportDirection.DOWNSTREAM,cast(uint)_options.maxHeaderSize));
223 	}
224 private:
225 	SVector _servers;
226 	EventLoop _mainLoop;
227 	EventLoopGroup _group = null;
228 
229 
230 	HTTPServerOptions _options;
231 	IPVector _ipconfigs;
232 	SSL_CTX * _ssl_Ctx = null;
233 
234 	bool _isStart = false;
235 }
236 
237 
238 private:
239 
240 import collie.codec.http.codec.http1xcodec;
241 import collie.codec.http.session.httpdownstreamsession;
242 import collie.codec.http.session.sessiondown;
243 
244 class HttpHandlerConnection : HTTPConnection
245 {
246 	this(TcpStream sock,HTTPSessionController controller,HTTPCodec codec)
247 	{
248 		super(sock);
249 		httpSession = new HTTPDownstreamSession(controller,codec,this);
250 	}
251 }
252 
253 class HttpHandlerPipeline : PipelineSessionDown
254 {
255 	this(HTTPSessionController controller,HTTPCodec codec)
256 	{
257 		httpSession(new HTTPDownstreamSession(controller,codec,this));
258 	}
259 }
260 
261 class ServerHandlerFactory : PipelineFactory!HTTPPipeline
262 {
263 	this(HTTPServer server)
264 	{
265 		_server = cast(typeof(_server))server;
266 	}
267 	override HTTPPipeline newPipeline(TcpStream transport) {
268 		auto pipe = HTTPPipeline.create();
269 		pipe.addBack(new TCPSocketHandler(transport));
270 		pipe.addBack(new HttpHandlerPipeline(cast(HTTPServer)_server,
271 				new HTTP1XCodec(TransportDirection.DOWNSTREAM,_server.maxHeaderSize)));
272 		pipe.finalize();
273 		return pipe;
274 	}
275 
276 private:
277 	HTTPServer _server;
278 }
279 
280 class ServerAccpeTFactory : AcceptPipelineFactory
281 {
282 	this(HTTPServerOptions.IPConfig config)
283 	{
284 		_conf = cast(typeof(_conf))config;
285 	}
286 
287 	override AcceptPipeline newPipeline(TcpListener acceptor) {
288 		logDebug("--new accpetPipeLine");
289 		AcceptPipeline pipe = AcceptPipeline.create();
290 		HTTPServer.setAcceptorConfig(_conf,acceptor);
291 		return pipe;
292 	}
293 
294 private:
295 	HTTPServerOptions.IPConfig _conf;
296 }